Theta V live streaming using Pi 3

Here is my code.It is not cleaned yet and I maybe have made some mistakes but if you know Python, this should by OK. This should work on a Pi Zero W, the CPU usage is about 20% on a Pi 3B+.
The theta (lastest firmware) is connected in client mode, it use arp-scan to find the theta on the network (not sure it’s the best but it works) .In this mode you need to use a HTTPDigestAuth.
The code is devided in 2 files : One for connection ,parameters,file copy and one for the streaming process.

ThetaV1.py

import threading, sys, time, subprocess
import	 tty, termios, socket
import requests,json
from threading import Thread
from requests.auth import HTTPDigestAuth
from multiprocessing import Process
from multiprocessing import Value

sys.path.append('/home/pi/cyclope')
from ThetaStreamServerV1 import ThetaStreamServer
from ThetaStreamServerV1 import ThetaHandler
from RepertoireV1 import Repertoire
	
	
class Theta(Thread):

	HEADERS = {'content-type': 'application/json'}
	ALERTES = ["Caméra 360 non connectée"]
	ALERTES_COPIE = ["Traitement fichier 360 en cours"]
	AUTH = HTTPDigestAuth('THETAYL00108440', '00108440')
	ARP = "sudo arp-scan -I wlan0 10.0.1.10-10.0.1.50"
	RICOH = "RICOH COMPANY"
	FICHIER_CORRESPONDANCE = "/home/pi/cyclope/fichiersTheta.json"
	
	def __init__(self,tempsBoucle=3,tempsReconnexion=3):
		
		super(Theta, self).__init__()
		self.stopRequest = threading.Event()
		self.tempsBoucle = tempsBoucle
		self.tempsReconnexion = tempsReconnexion
		self.adresseIp = ""
		self.alertes = Theta.ALERTES
		self.niveauBatterie = 0.
		self.fichiersThetaJson = json.loads(' { "fichiers" : [] }')
		self.recordingIndicator = False
		self.nomFichier = ""
		self.list = []
		""" Modes de la theta :
			0 : adresse IP non trouvée
			1 : IP trouvee , serveur stream demare , mais pas de connexion active
			2 : au repos, live autorise
			3 : en enregistrement
			4 : en transfert de fichier
		"""
		self.mode = Value('i',0)
		self.lireFichiersTheta()
		
		self.methodeSend360 = "0"
		self.listefichiersSend360 = ""

	def getAlertes(self):
		return self.alertes
		
	def getMode(self):
		return self.mode.value

	def getAdresseIp(self):
		output = (subprocess.check_output(Theta.ARP, shell=True )).decode("utf-8").split("\n")
		for line in output:
			if (line.find(Theta.RICOH) != -1):
				self.adresseIp = line.split()[0]
				with self.mode.get_lock():
					self.mode.value = 1
				self.startLive()
					
					
	def _live(self):	
		print("_live")
		serveurLive =  ThetaStreamServer(('',9091),ThetaHandler)	
		serveurLive.initChargement(self.adresseIp,self.mode)
		serveurLive.serve_forever()


	def startLive(self):
		try:
			print("startLive")
			self.p = Process(target=self._live)
			self.p.start()
		except Exception as err :
			print("EXCEPTION startLive")
			print(err)
			pass

	def setOptions1(self):
		url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))		
		body = json.dumps({"name": "camera.setOptions",	"parameters": {	"options": {
							"captureMode": "video",	"sleepDelay": 1200, "offDelay": 600, "videoStitching" : "none",
								"_microphoneChannel": "1ch", "_gain" : "mute",	"_shutterVolume" : 100,
								"previewFormat" : {"width": 1024, "height": 512, "framerate": 8}}}})
		try:
			req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=3)
		except Exception:
			pass

	def setOptions2(self):
		url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))
		
		body = json.dumps({"name": "camera.setOptions",	"parameters": {	"options": {							
								"fileFormat": {"type": "mp4", "width": 1920, "height": 960, "_codec": "H.264/MPEG-4 AVC"},
								"_maxRecordableTime" : 1500, "_bitrate" : "Normal",	"whiteBalance": "auto"	}}})
		try:
			req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=3)
		except Exception:
			pass				
	

	def startRecording(self,nomFichier):		
		if (self.mode.value == 2):
			url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))		
			body = json.dumps({	"name": "camera.startCapture"})
			try:
				req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=3)
				self.recordingIndicator = True
				self.nomFichier = nomFichier
				with self.mode.get_lock():
					self.mode.value = 3
			except Exception:
				pass
			
			

	def stopRecording(self):
		if (self.mode.value == 3):
			url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))		
			body = json.dumps({"name": "camera.stopCapture"})
			try:
				req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=3)
				print(req.content.decode('UTF-8'))
				with self.mode.get_lock():
					self.mode.value = 2
			except Exception:
				pass		

			
	def getState(self):
		url = "".join(("http://", self.adresseIp, ":80/osc/state"))
		print ("THETA getState\n")
		try:
			req = requests.post(url,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=10)
			if (req.status_code == 200):
				if (self.mode.value == 1):
					self.setOptions1()
					self.setOptions2()			
					with self.mode.get_lock():
						self.mode.value = 2
					
			
				donneeJson = json.loads(req.content.decode('UTF-8'))
				self.niveauBatterie = donneeJson["state"]["batteryLevel"]
				#TODO erreur internes + alrte temps enregistrment
				self.alertes = []
				print("req.status_code 200")
				#if (self.mode.value == 4):
				#	print("Theta.ALERTES_COPIE")
				#	self.alertes=Theta.ALERTES_COPIE
					
				capture = donneeJson["state"]["_captureStatus"]
				fileUrl = donneeJson["state"]["_latestFileUrl"]
				recordableTime= donneeJson["state"]["_recordableTime"]
				print("recordableTime")
				print(recordableTime)
				print("capture")
				print(capture)
				print(fileUrl)
				if ((self.recordingIndicator == True) and (capture == "idle")):	
					print(fileUrl)
					fileUrlSplit=fileUrl.split("/")

					self.ajoutFichiersTheta(fileUrlSplit[-1])
					self.nomFichier = ""
					self.recordingIndicator = False
				#if (capture == "shooting"):

		
		except Exception as err:
				print ("THETA getState Exception \n")
				print(err)
				with self.mode.get_lock():
					self.mode.value = 1
				self.alertes = Theta.ALERTES
				self.niveauBatterie = 0.
	
			
			

	
			
			
	def lireFichiersTheta(self):
		print("lireFichiersTheta")
		date = time.strftime("%Y-%m-%d",time.localtime(time.time()-60*60*24*30))
		try:
			with open(Theta.FICHIER_CORRESPONDANCE, "rt") as fichier:
				listeAvantTri = fichier.read()
				listeAvantTriJson = json.loads(listeAvantTri)
				for chunk in listeAvantTriJson["fichiers"]:
					if (chunk["date"] > date):
						self.fichiersThetaJson["fichiers"].append(chunk)			
		except Exception:
			print("Exception lireFichiersTheta")
			pass
			
			
	def ecrireFichiersTheta(self):
		print("ecrireFichiersTheta")
		try:
			with open(Theta.FICHIER_CORRESPONDANCE, "wt") as fichier:
				#fichier.write("[ ")
				#fichier.write(",".join(self.fichiersThetaJson))
				#fichier.write("]")		
				fichier.write( json.dumps(self.fichiersThetaJson))
		except Exception:
			print("Exception ecrireFichiersTheta")
			pass	
	
	
	def ajoutFichiersTheta(self,name):
		print("ajoutFichiersTheta")
		print(name)
		date = time.strftime("%Y-%m-%d",time.localtime())
		#ajout = json.dumps({ "name" : name , "nom" : self.nomFichier , "date" : date })
		ajout = { "name" : name , "nom" : self.nomFichier, "date" :  date  }
		if (self.nomFichier != ''):
			self.fichiersThetaJson["fichiers"].append(ajout)
			self.ecrireFichiersTheta()
			




	def join (self, timeout=2):
		self.p.join(timeout)
		self.stopRequest.set()
		super(Theta, self).join(timeout)			
			

	def listFiles(self,targetDir,espaceDisponible):
		print("listFiles")
		print(targetDir)
		print((espaceDisponible))
		listeFichiersJson = json.loads(' { "fichiers" : [], "espaceDisponible" : "", "message" : "" }')
		listeFichiers = ""
		
		message = ""
		limit = 0
		
		if (targetDir == Repertoire.CHEMIN_CARTE):
			message = "Clé USB non présente"
			limit = 1
		
		
		
		if (self.mode.value == 2):
			url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))
			body = json.dumps({"name": "camera.listFiles",
						"parameters": {
								"fileType": "all",
								"entryCount": 128,
								"_detail" : False,
								"maxThumbSize": 0
						}
			})
			try:
				req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=3)
				listeFichiers = req.content.decode('UTF-8')				
				entries = json.loads(listeFichiers)["results"]["entries"]
				self.list = entries
				for chunck in entries:
						name = chunck["name"]
						info = chunck["name"] + " (" + str(int(chunck["size"]/1000000)) + " Mo)"
						for chunck2 in self.fichiersThetaJson["fichiers"]:
							if (name == chunck2["name"]):
								info = chunck2["nom"] + " (" + str(int(chunck["size"]/1000000)) + " Mo)"
							
						ajout = { "name" : name , "info" : info, "size" : int(chunck["size"]/1000000) }
						listeFichiersJson["fichiers"].append(ajout)
			except Exception:
				pass		
					
		else:
			limit=2
			message = "Caméra 360 non disponible"
		
		print("listFiles2")
		listeFichiersJson["espaceDisponible"]=int(espaceDisponible)/1000 -1
		listeFichiersJson["message"]=message
		listeFichiersJson["limit"]=limit
		print("listFiles3")
		return json.dumps(listeFichiersJson)


	def sendFichiers360(self,methode,listefichiers):
		self.methodeSend360 = methode
		self.listefichiersSend360 = listefichiers


	def _sendFichiers360(self):
		print("sendFichiers360")
		methode= self.methodeSend360
		self.methodeSend360 = "0"
		print(methode)
		print(self.methodeSend360)
		self.alertes=Theta.ALERTES_COPIE
		for x in self.listefichiersSend360.split("_"):
			fileUrl = ""
			nomFichierDestination = x
			for y in self.list:
				if (y["name"] == x):
					fileUrl= y["fileUrl"]
			for z in self.fichiersThetaJson["fichiers"]:
				if (z["name"] == x):
					nomFichierDestination= z["nom"]
			
			if (fileUrl != ""):
				if (methode == "1"):				
					self.deleteFile(fileUrl)
				if (methode == "2"):
					print(nomFichierDestination)
					self.getFile(fileUrl,Repertoire.CHEMIN_CLE,nomFichierDestination)
				if (methode == "3"):	
					result = self.getFile(fileUrl,Repertoire.CHEMIN_CLE,nomFichierDestination)
					if (result == 0):
						self.deleteFile(fileUrl)



	def deleteFile(self,fileUrl):
		print ("deleteFile")
		print (fileUrl)
		fileUrls = []
		fileUrls.append(fileUrl)
		url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))
		body = json.dumps({"name": "camera.delete",
						"parameters": {
								"fileUrls": 	fileUrls		}
			})
		try:
			req = requests.post(url, data=body,headers=Theta.HEADERS,auth=Theta.AUTH,timeout=5)	
			print (str(req.status_code))
		except Exception:
			pass
		
	def getFile(self,fileUrl,repertoireDestination,nomFichierDestination):
		print("getFile")
		print("fileUrl:" + fileUrl)
		print("repertoireDestination:" + repertoireDestination)
		print("nomFichierDestination:" + nomFichierDestination)
		exit_code = 0
		sequence = (repertoireDestination,"/",nomFichierDestination)
		fichier = "".join(sequence)
		
		print ("fichier:"  + fichier)
		
		if (self.mode.value == 2):
			with self.mode.get_lock():
					self.mode.value = 4
					
			#self.alertes = Theta.ALERTES_COPIE
			try:
				print ("getFile : " + fileUrl + " vers " + fichier)
				mon_fichier = open(fichier, "wb")
						
				response = requests.get(fileUrl,headers=Theta.HEADERS,auth=Theta.AUTH,stream=True,timeout=5)
				if response.status_code == 200:
					for block in response.iter_content(chunk_size=100000):
						if block:
							mon_fichier.write(block)
				
				mon_fichier.close()
				#self.alertes = []
			except Exception as err:
				print("Exception getFile")
				print (err)
				#self.alertes = ["Exception getFile"]
				exit_code = 1
				pass
			
			with self.mode.get_lock():
					self.mode.value = 2
			return exit_code
			
	
	
	def run(self) :		
		while not self.stopRequest.is_set():		
			if (self.mode.value == 0):
				self.getAdresseIp()	
		
			if (self.mode.value > 0):
				self.getState()
				if (self.methodeSend360 != "0"):
					self._sendFichiers360()

					
			if (self.mode.value == 0):
				time.sleep(self.tempsReconnexion)
			else:
				time.sleep(self.tempsBoucle)

ThetaStreamServerV1.py

import threading, sys, time, subprocess
import	 tty, termios, socket
import requests,json
from threading import Thread
from requests.auth import HTTPDigestAuth
from multiprocessing import Process
from multiprocessing import Value
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn

sys.path.append('/home/pi/cyclope')

class ThetaStreamServer(ThreadingMixIn, HTTPServer):

def initChargement(self,adresseIp,mode):
	self.nbClients = Value('i',0)
	self.thetaGetLivePreview = ThetaGetLivePreview(adresseIp,mode,self.nbClients)
	self.mode = mode
	self.thetaGetLivePreview.start()	
		
class ThetaHandler(BaseHTTPRequestHandler):

def do_GET(self):
	print("do_GET")
	with self.server.nbClients.get_lock():
		self.server.nbClients.value += 1
		print("nbClients :"		+ str(self.server.nbClients.value))
	try:	
		time.sleep(1)
		self.send_response(200)
		self.send_header('Access-Control-Allow-Origin', '*')
		self.send_header('Content-type', 'multipart/x-mixed-replace; boundary=--osclive')
		self.send_header("Pragma-directive", "no-cache")
		self.send_header("Cache-directive", "no-cache")
		#self.send_header("Cache-control", "no-cache")
		self.send_header("Cache-control", "no-cache, no-store, must-revalidate")
		self.send_header("Pragma", "no-cache")
		#self.send_header("Expires", "0")

		image=b''
		timeStampImage = ''
		
		#while (self.server.mode.value == 2 ):
		while True:
			if (timeStampImage != self.server.thetaGetLivePreview.timeStampImage):
				#print("NEW IMAGE SENT")
				timeStampImage = self.server.thetaGetLivePreview.timeStampImage
				self.end_headers()
				image= self.server.thetaGetLivePreview.image
				self.wfile.write('--osclive'.encode())
				self.end_headers()
				self.send_header('Content-type', 'image/jpeg')
				self.send_header('Content-length', len(image))
				self.end_headers()
				self.wfile.write(image)
			time.sleep(0.1)
		
		with self.server.nbClients.get_lock():
			self.server.nbClients.value = self.server.nbClients.value - 1
			
		print ("end DO GEtttttttttttttttttttttttttttt")
	except Exception as err:			
		print("Exception do_GET")
		print(err)
		with self.server.nbClients.get_lock():
			self.server.nbClients.value = self.server.nbClients.value - 1
		

		

class ThetaGetLivePreview(Thread):

def __init__(self, adresseIp,mode,nbClients):
	print("init ThetaGetLivePreview")
	super(ThetaGetLivePreview, self).__init__()
	self.stopRequest = threading.Event()
	self.adresseIp = adresseIp
	self.image=b''
	self.timeStampImage=time.time()
	self.isChargingImage = False
	self.mode = mode
	self.nbClients = nbClients

def _appel(self):
	print("_appel ThetaGetLivePreview")
	print(str(self.mode.value))
	
	url = "".join(("http://", self.adresseIp, ":80/osc/commands/execute"))
	body = json.dumps({"name": "camera.getLivePreview"})
	try:
		response = requests.post(url, data=body, headers={'content-type': 'application/json'},auth=HTTPDigestAuth('THETAYL00108440', '00108440'), stream=True,timeout=5)
		print("chargement _appel - response")
		if response.status_code == 200:
			self.isChargingImage = True
			bytes = ''
			jpg=''
			i = 0
			for block in response.iter_content(chunk_size=10000):
				if (self.nbClients.value == 0):
						print("ARRET _appel")
						response.close()
						raise StopIteration()
						
				if (bytes == ''):
					bytes = block
				else: 	
					bytes = bytes + block
				
				# Search the current block of bytes for the jpq start and end
				a = bytes.find(b'\xff\xd8')
				b = bytes.find(b'\xff\xd9')

				# If you have a jpg
				if a != - 1 and b != -1:
					print("image - chargement")
					self.image = bytes[a:b + 2]
					self.timeStampImage=time.time()
					bytes = bytes[b + 2:]					
					
		else:
			print("theta response.status_code _appel: {0}".format(response.status_code))
			response.close()
			self.isChargingImage = False	
	except Exception as err:
		print("theta erreur _appel: {0}".format(err))
		self.isChargingImage = False		

def run(self):
	while not self.stopRequest.is_set():
		if (self.mode.value == 2 and self.nbClients.value > 0):
			if (self.isChargingImage == False):
				self._appel()
		else:
			self.isChargingImage = False	
		time.sleep(0.5)

The Javascript part will come soon (the code quality is worth than the python ones). Any browser should view the video (but in equirectangular) .

Hugues

3 Likes